fix(transport): resolve SSE stream timeout in JdkHttpTransport#1322
fix(transport): resolve SSE stream timeout in JdkHttpTransport#1322jujn wants to merge 4 commits into
Conversation
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
LearningGp
left a comment
There was a problem hiding this comment.
future.cancel(true) does not guarantee the immediate closure of the underlying socket under HTTP/2 or HTTP/1.1 keep-alive scenarios (see JDK-8245462).
While Flux.using already handles cleanup during the stream phase, there is no cleanup hook for the intermediate window if a timeout triggers before the stream starts. As a result, the connection is returned to the pool with unconsumed data, polluting subsequent reuse.
There was a problem hiding this comment.
Pull request overview
This PR updates the JDK-based HTTP transport to avoid absolute request timeouts for SSE/NDJSON streaming, replacing them with Reactor-managed response/idle timeouts and improving cancellation cleanup to better support long-lived LLM streams (e.g., long TTFT).
Changes:
- Add
responseTimeoutandstreamIdleTimeouttoHttpTransportConfigto control TTFT and inter-chunk idle gaps for streaming. - Rework
JdkHttpTransport.stream(...)to remove JDKHttpRequest.timeout()from streaming requests and enforce streaming timeouts via Reactor, with more robust cancellation/cleanup behavior. - Expand
JdkHttpTransportTestcoverage for long-TTFT streams, idle timeouts, and late async completion cleanup.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java | Removes JDK request timeout for streaming and implements Reactor-based response/idle timeouts with improved cancellation/cleanup. |
| agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java | Introduces new streaming timeout configuration fields and updates timeout documentation. |
| agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java | Adds regression tests validating streaming timeout behavior and resource cleanup. |
Comments suppressed due to low confidence (1)
agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java:278
- The stream-level
.timeout(Mono.delay(streamResponseTimeout()), ...)restarts theresponseTimeoutwindow after headers are received (since the earlier.timeout(...)instream()already waited for the response to complete). IfresponseTimeoutis intended to be a TTFT from request start, this results in a longer-than-configured effective timeout. Either clarify thatresponseTimeoutis per-phase (headers and first chunk), or restructure to enforce a single TTFT window from subscription start through the first emitted data.
return processStreamResponse(inputStream, request)
.timeout(
// Timeout strategy 1: Time To First Token (TTFT).
// The maximum time to wait for the first piece of data after headers.
Mono.delay(streamResponseTimeout()),
Description
Close #1302
This PR fixes
JdkHttpTransportstreaming behavior for long-running SSE/NDJSON responses, especially LLM streams with long Time-To-First-Token (TTFT). Previously, JDKHttpRequest.timeout()used absolute timeout semantics for streaming requests, which could terminate healthy long-lived streams. In addition, blocking stream reads and error-body reads could run on the JDK HttpClient callback path, and cancellation/timeout did not reliably close response bodies in all intermediate states.Key Changes
Removed absolute JDK request timeout for streaming requests
HttpRequest.timeout()is still applied to non-streaming requests.Added Reactor-managed streaming timeouts
responseTimeout: maximum time to wait for response headers / first stream data.streamIdleTimeout: maximum gap between emitted stream chunks.Hardened cancellation and timeout cleanup
Mono.fromFuture(...)flow with an explicit cancellation-aware wrapper aroundHttpClient.sendAsync(...).InputStreamas soon as it becomes available.Prevented blocking work on JDK HttpClient callback threads
Schedulers.boundedElastic().boundedElastic()to avoid blocking the JDK HttpClient internal executor.Expanded regression coverage
Checklist
Please check the following items before code is ready to be reviewed.
mvn spotless:applymvn test)